{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n",
    "\n",
    "# 15 - EMR"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "\n",
    "import awswrangler as wr"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Enter your bucket name:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdin",
     "output_type": "stream",
     "text": [
      " ··········································\n"
     ]
    }
   ],
   "source": [
    "import getpass\n",
    "\n",
    "bucket = getpass.getpass()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Enter your Subnet ID:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdin",
     "output_type": "stream",
     "text": [
      " ························\n"
     ]
    }
   ],
   "source": [
    "subnet = getpass.getpass()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating EMR Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster_id = wr.emr.create_cluster(subnet)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Uploading our PySpark script to Amazon S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "script = \"\"\"\n",
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n",
    "sc = spark.sparkContext\n",
    "\n",
    "print(\"Spark Initialized\")\n",
    "\"\"\"\n",
    "\n",
    "_ = boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test.py\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Submit PySpark step"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "step_id = wr.emr.submit_step(cluster_id, command=f\"spark-submit s3://{bucket}/test.py\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Wait Step"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n",
    "    pass"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Terminate Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "wr.emr.terminate_cluster(cluster_id)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.9.14",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.14"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
